Skip to content

feat(datafusion): add catalog-level temporary table support#309

Merged
JingsongLi merged 2 commits intoapache:mainfrom
JingsongLi:tempTable
May 8, 2026
Merged

feat(datafusion): add catalog-level temporary table support#309
JingsongLi merged 2 commits intoapache:mainfrom
JingsongLi:tempTable

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi commented May 6, 2026

Purpose

This PR introduces catalog-level temporary table and view support for the DataFusion integration. Previously, temporary tables were not scoped to a specific Paimon catalog. This change allows users to create, query, and drop temporary tables and views within any registered Paimon catalog using standard SQL syntax.

Also add Python API:

import pyarrow as pa

batch = pa.record_batch([[1, 2], ["alice", "bob"]], names=["id", "name"])

ctx.register_batch("paimon.default.my_temp", batch)

batches = ctx.sql("SELECT * FROM paimon.default.my_temp")

# Drop it via SQL when no longer needed
ctx.sql("DROP TEMPORARY TABLE paimon.default.my_temp")

Brief change log

  • Catalog-level temp storage: Added temp_tables to PaimonCatalogProvider (backed by RwLock<HashMap<String, MemorySchemaProvider>>) so that temporary tables are organized per database inside each catalog.
  • SchemaProvider resolution: Updated PaimonSchemaProvider to check temporary tables first, falling back to real Paimon tables. This ensures temp tables can shadow persistent tables without naming conflicts.
  • SQLContext APIs: Added register_temp_table, deregister_temp_table, and temp_table_exist for programmatic temp table management with flexible TableReference names.
  • SQL parsing: Extended SQLContext::sql to intercept and handle:
    • CREATE TEMPORARY TABLE (col TYPE, ...)
    • CREATE TEMPORARY TABLE ... AS SELECT ...
    • CREATE TEMPORARY VIEW ... AS SELECT ...
    • DROP TEMPORARY TABLE ...
  • Time-travel queries: Refactored VERSION AS OF / TIMESTAMP AS OF execution to register a UUID-based temporary table internally, avoiding global namespace pollution.
  • Docs & tests: Renamed datafusion.md to sql.md, documented all new SQL syntax, and added comprehensive integration tests.

Tests

  • Added crates/integrations/datafusion/tests/sql_context_tests.rs covering:
    • CREATE TEMPORARY TABLE with column definitions
    • CREATE TEMPORARY TABLE ... AS SELECT
    • CREATE TEMPORARY VIEW ... AS SELECT
    • DROP TEMPORARY TABLE / DROP TEMPORARY VIEW
    • IF NOT EXISTS semantics
    • Temp table shadowing and lifecycle
  • Updated existing test suites (read_tables.rs, merge_into_tests.rs, delete_tests.rs, etc.) to align with the new catalog resolution logic.

API and Format

  • New public APIs on SQLContext:
    • register_temp_table(name, Arc)
    • deregister_temp_table(name) -> Option<Arc>
    • temp_table_exist(name) -> bool
  • New public APIs on PaimonCatalogProvider:
    • register_temp_table(database, table_name, table)
    • deregister_temp_table(database, table_name)
    • temp_table_exist(database, table_name) -> bool
  • No changes to the underlying Paimon storage format.

Documentation

  • Renamed docs/src/datafusion.md to docs/src/sql.md and updated mkdocs.yml.
  • Documented temporary table SQL syntax, SQLContext usage examples, and catalog registration patterns.

@JingsongLi JingsongLi changed the title feat(datafusion): add catalog-level temporary table support [WIP] feat(datafusion): add catalog-level temporary table support May 6, 2026
@JingsongLi JingsongLi changed the title [WIP] feat(datafusion): add catalog-level temporary table support feat(datafusion): add catalog-level temporary table support May 7, 2026
@JingsongLi JingsongLi force-pushed the tempTable branch 2 times, most recently from ceacfed to 75c2fec Compare May 7, 2026 11:37
Copy link
Copy Markdown

@jerry-024 jerry-024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comprehensive work on catalog-scoped temp tables! The core design is solid — scoping temp tables per-catalog, the TempTableTracker RAII pattern, and migrating all tests from datafusion.public to the Paimon catalog path are all good improvements.

Below are inline comments on the issues I found, prioritized by severity.

P2 — Suggest fixing (non-blocking):

  1. looks_like_create_table also matches CREATE TEMP VIEW (sql_context.rs:1273): The TEMPORARY/TEMP skip doesn't distinguish TABLE from VIEW. If a CREATE TEMPORARY VIEW ever contains Paimon-specific syntax like PARTITIONED BY, the pre-parser would incorrectly try to extract partition keys. Currently not triggered but is a latent bug.

  2. bindings/python/README.md — Usage section removed without replacement: Users reading the README on GitHub/PyPI lose the quick-start guide. Consider adding a link to project-description.md or restoring a minimal example.

  3. Python register_batch lacks negative test: No test for calling register_batch with an invalid/unknown catalog name.

///
/// Returns the table name, version/tag value, and byte range of the full clause.
fn extract_version_as_of(sql: &str) -> Option<VersionAsOfInfo> {
let lower = sql.to_lowercase();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 — Must fix (Java Paimon compatibility)

lower.find("version as of ") is a naive string search that will fail on valid SQL patterns that work in Java Paimon (Flink/Spark):

  1. JOIN two time-travel tables: SELECT * FROM t1 VERSION AS OF 1 JOIN t2 VERSION AS OF 2 ON ... — only the first match is found; the second is silently ignored.
  2. Subquery / CTE: WITH cte AS (SELECT * FROM t VERSION AS OF 1) SELECT * FROM cteclause_range replacement corrupts the SQL structure.
  3. String literal false positive: WHERE note = 'version as of 1' — matches inside a string constant.
  4. Table alias after clause: FROM t VERSION AS OF 1 AS tt — replacement range doesn't account for the alias.

In Java Paimon, all of these are handled correctly by the SQL parser. Users migrating SQL from Flink/Spark will hit unexpected failures.

Suggestion: Consider a two-pass approach — first extract and strip time-travel clauses with awareness of quoted strings and parenthesis nesting, then hand the cleaned SQL to Parser::parse_sql. At minimum, add a quoted-string skip to avoid false positives inside string literals.

);

// Register the provider under the UUID temp table name
self.register_temp_table(uuid_name.as_str(), provider)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Temp table leak on panic

If self.ctx.sql() panics, this temp table is never cleaned up. This PR already introduces TempTableTracker as an RAII guard for exactly this pattern (used in execute_cow_delete_once, execute_cow_merge_once, etc.), but it's not used here.

Suggested fix:

let mut tracker = TempTableTracker::new(self);
self.register_temp_table(uuid_name.as_str(), provider)?;
tracker.register(&uuid_name);

let result = self.ctx.sql(&rewritten_sql).await;
// tracker auto-deregisters on drop
result

Comment thread docs/src/sql.md Outdated
- `"database.my_table"` — uses the current catalog with the specified database
- `"catalog.database.my_table"` — fully qualified

### register_mem_table
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — Non-existent API

register_mem_table is documented here but doesn't exist in the code — the PR implements register_temp_table instead. Either add register_mem_table as a convenience wrapper, or update the docs to reference register_temp_table.

) -> DFResult<()> {
// Check if a temp table with this name already exists
{
let databases = self.temp_tables.read().unwrap_or_else(|e| e.into_inner());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 — TOCTOU between existence check and registration

This read-lock check and the actual registration via get_or_create_temp_database (line 254) are done in separate lock acquisitions. Between them, there's a blocking block_on_with_runtime call (shadow warning, lines 235-252) where another thread could register a table with the same name.

Since register_temp_table is a public API, consider holding the write lock for the entire check-then-register operation, or using MemorySchemaProvider::register_table's return value to detect the conflict atomically.

let (catalog, _catalog_name, identifier) = self.resolve_table_name_from_ref(&table_ref)?;

let paimon_table = catalog
.get_table(&identifier)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 — Must fix (dynamic options bypass)

catalog.get_table(&identifier) here loads the table directly from the Paimon catalog, bypassing the normal PaimonSchemaProvider path. This means session-scoped dynamic options set via SET 'paimon.xxx' = ... are not applied to time-travel queries.

For example:

SET 'paimon.blob-as-descriptor' = 'true';
SELECT * FROM paimon.default.my_table VERSION AS OF 1;  -- blob-as-descriptor NOT applied

The normal table loading path in PaimonSchemaProvider::table() applies dynamic_options before creating the provider. Time-travel should do the same — either pass self.dynamic_options to copy_with_options alongside the time-travel options, or go through the schema provider.

let statements = Parser::parse_sql(&dialect, &rewritten_sql)
let sql_lower = rewritten_sql.to_lowercase();
let has_time_travel =
sql_lower.contains("version as of") || sql_lower.contains("timestamp as of");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 — Must fix (false positive on string literals)

This contains() check is the entry gate that short-circuits all subsequent SQL parsing. Any SQL containing the substring version as of or timestamp as of — even inside a string literal or comment — will skip Parser::parse_sql entirely and enter the regex-based time-travel path.

Examples that would be misrouted:

SELECT * FROM t WHERE note = 'version as of 1'
-- comment: version as of something
INSERT INTO t VALUES ('timestamp as of now')

This needs at minimum a quoted-string-aware scan before deciding to treat the query as time-travel. One approach: skip over single-quoted strings ('...') and SQL comments (-- / /* */) before checking for the keyword.

self.ctx.sql(sql).await
}
Statement::Truncate(truncate) => self.handle_truncate_table(truncate).await,
Statement::CreateView(create_view) => self.handle_create_view(create_view).await,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 — Must fix (breaks non-Paimon catalogs)

This intercepts all CREATE VIEW statements, and the Statement::Drop arm below (line 385) intercepts all DROP TABLE / DROP VIEW. Before this PR, these would fall through to self.ctx.sql(sql) and be handled by DataFusion normally.

Now:

  • CREATE VIEW datafusion.public.my_view AS ... → errors with "CREATE VIEW (non-temporary) is not supported" (line 1113), even though DataFusion can handle it.
  • DROP TABLE datafusion.public.xresolve_catalog_and_table looks up only Paimon catalogs and fails with "Unknown catalog 'datafusion'".

Suggestion: Only intercept these statements when the resolved catalog is a registered Paimon catalog. If the catalog name doesn't exist in self.catalogs, fall through to self.ctx.sql(sql).await for DataFusion's default handling.

@JingsongLi
Copy link
Copy Markdown
Contributor Author

JingsongLi commented May 8, 2026

Thanks @jerry-024 for the review, addressed comments.

Copy link
Copy Markdown

@jerry-024 jerry-024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 0d55ac7 into apache:main May 8, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants